// Copyright 2024 - 2025 Crunchy Data Solutions, Inc.
//
// SPDX-License-Identifier: Apache-2.0

package collector

import (
	"context"
	"testing"

	"gotest.tools/v3/assert"

	"github.com/crunchydata/postgres-operator/internal/feature"
	"github.com/crunchydata/postgres-operator/internal/naming"
	"github.com/crunchydata/postgres-operator/internal/testing/require"
	"github.com/crunchydata/postgres-operator/pkg/apis/postgres-operator.crunchydata.com/v1beta1"
)

func TestEnablePgBouncerLogging(t *testing.T) {
	t.Run("EmptyInstrumentationSpec", func(t *testing.T) {
		gate := feature.NewGate()
		assert.NilError(t, gate.SetFromMap(map[string]bool{
			feature.OpenTelemetryLogs: true,
		}))
		ctx := feature.NewContext(context.Background(), gate)

		config := NewConfig(nil)
		cluster := new(v1beta1.PostgresCluster)
		require.UnmarshalInto(t, &cluster.Spec, `{
			instrumentation: {}
		}`)
		EnablePgBouncerLogging(ctx, cluster, config, naming.PGBouncerFullLogPath)

		result, err := config.ToYAML()
		assert.NilError(t, err)
		assert.DeepEqual(t, result, `# Generated by postgres-operator. DO NOT EDIT.
# Your changes will not be saved.
exporters:
  debug:
    verbosity: detailed
extensions:
  file_storage/pgbouncer_logs:
    create_directory: false
    directory: /tmp/receiver
    fsync: true
processors:
  batch/1s:
    timeout: 1s
  batch/200ms:
    timeout: 200ms
  batch/logs:
    send_batch_size: 8192
    timeout: 200ms
  groupbyattrs/compact: {}
  resource/pgbouncer:
    attributes:
    - action: insert
      key: k8s.container.name
      value: pgbouncer
    - action: insert
      key: k8s.namespace.name
      value: ${env:K8S_POD_NAMESPACE}
    - action: insert
      key: k8s.pod.name
      value: ${env:K8S_POD_NAME}
    - action: insert
      key: process.executable.name
      value: pgbouncer
  resourcedetection:
    detectors: []
    override: false
    timeout: 30s
  transform/pgbouncer_logs:
    log_statements:
    - statements:
      - set(instrumentation_scope.name, "pgbouncer")
      - merge_maps(log.cache, ExtractPatterns(log.body, "^(?<timestamp>\\d{4}-\\d{2}-\\d{2}
        \\d{2}:\\d{2}:\\d{2}\\.\\d{3} [A-Z]{3}) \\[(?<pid>\\d+)\\] (?<log_level>[A-Z]+)
        (?<msg>.*$)"), "insert")
      - set(log.severity_text, log.cache["log_level"])
      - set(log.severity_number, SEVERITY_NUMBER_DEBUG)  where log.severity_text ==
        "NOISE" or log.severity_text == "DEBUG"
      - set(log.severity_number, SEVERITY_NUMBER_INFO)   where log.severity_text ==
        "LOG"
      - set(log.severity_number, SEVERITY_NUMBER_WARN)   where log.severity_text ==
        "WARNING"
      - set(log.severity_number, SEVERITY_NUMBER_ERROR)  where log.severity_text ==
        "ERROR"
      - set(log.severity_number, SEVERITY_NUMBER_FATAL)  where log.severity_text ==
        "FATAL"
      - set(log.time, Time(log.cache["timestamp"], "%F %T.%L %Z")) where IsString(log.cache["timestamp"])
      - set(log.attributes["log.record.original"], log.body)
      - set(log.attributes["process.pid"], log.cache["pid"])
      - set(log.body, log.cache["msg"])
receivers:
  filelog/pgbouncer_log:
    include:
    - /tmp/*.log
    - /tmp/*.log.1
    storage: file_storage/pgbouncer_logs
service:
  extensions:
  - file_storage/pgbouncer_logs
  pipelines:
    logs/pgbouncer:
      exporters:
      - debug
      processors:
      - resource/pgbouncer
      - transform/pgbouncer_logs
      - resourcedetection
      - batch/logs
      - groupbyattrs/compact
      receivers:
      - filelog/pgbouncer_log
`)
	})

	t.Run("InstrumentationSpecDefined", func(t *testing.T) {
		gate := feature.NewGate()
		assert.NilError(t, gate.SetFromMap(map[string]bool{
			feature.OpenTelemetryLogs: true,
		}))
		ctx := feature.NewContext(context.Background(), gate)

		config := NewConfig(testInstrumentationSpec())

		cluster := new(v1beta1.PostgresCluster)
		cluster.Spec.Instrumentation = testInstrumentationSpec()

		EnablePgBouncerLogging(ctx, cluster, config, naming.PGBouncerFullLogPath)

		result, err := config.ToYAML()
		assert.NilError(t, err)
		assert.DeepEqual(t, result, `# Generated by postgres-operator. DO NOT EDIT.
# Your changes will not be saved.
exporters:
  debug:
    verbosity: detailed
  googlecloud:
    log:
      default_log_name: opentelemetry.io/collector-exported-log
    project: google-project-name
extensions:
  file_storage/pgbouncer_logs:
    create_directory: false
    directory: /tmp/receiver
    fsync: true
processors:
  batch/1s:
    timeout: 1s
  batch/200ms:
    timeout: 200ms
  batch/logs:
    send_batch_size: 8192
    timeout: 200ms
  groupbyattrs/compact: {}
  resource/pgbouncer:
    attributes:
    - action: insert
      key: k8s.container.name
      value: pgbouncer
    - action: insert
      key: k8s.namespace.name
      value: ${env:K8S_POD_NAMESPACE}
    - action: insert
      key: k8s.pod.name
      value: ${env:K8S_POD_NAME}
    - action: insert
      key: process.executable.name
      value: pgbouncer
  resourcedetection:
    detectors: []
    override: false
    timeout: 30s
  transform/pgbouncer_logs:
    log_statements:
    - statements:
      - set(instrumentation_scope.name, "pgbouncer")
      - merge_maps(log.cache, ExtractPatterns(log.body, "^(?<timestamp>\\d{4}-\\d{2}-\\d{2}
        \\d{2}:\\d{2}:\\d{2}\\.\\d{3} [A-Z]{3}) \\[(?<pid>\\d+)\\] (?<log_level>[A-Z]+)
        (?<msg>.*$)"), "insert")
      - set(log.severity_text, log.cache["log_level"])
      - set(log.severity_number, SEVERITY_NUMBER_DEBUG)  where log.severity_text ==
        "NOISE" or log.severity_text == "DEBUG"
      - set(log.severity_number, SEVERITY_NUMBER_INFO)   where log.severity_text ==
        "LOG"
      - set(log.severity_number, SEVERITY_NUMBER_WARN)   where log.severity_text ==
        "WARNING"
      - set(log.severity_number, SEVERITY_NUMBER_ERROR)  where log.severity_text ==
        "ERROR"
      - set(log.severity_number, SEVERITY_NUMBER_FATAL)  where log.severity_text ==
        "FATAL"
      - set(log.time, Time(log.cache["timestamp"], "%F %T.%L %Z")) where IsString(log.cache["timestamp"])
      - set(log.attributes["log.record.original"], log.body)
      - set(log.attributes["process.pid"], log.cache["pid"])
      - set(log.body, log.cache["msg"])
receivers:
  filelog/pgbouncer_log:
    include:
    - /tmp/*.log
    - /tmp/*.log.1
    storage: file_storage/pgbouncer_logs
service:
  extensions:
  - file_storage/pgbouncer_logs
  pipelines:
    logs/pgbouncer:
      exporters:
      - googlecloud
      processors:
      - resource/pgbouncer
      - transform/pgbouncer_logs
      - resourcedetection
      - batch/logs
      - groupbyattrs/compact
      receivers:
      - filelog/pgbouncer_log
`)
	})
}

func TestEnablePgBouncerMetrics(t *testing.T) {
	t.Run("EmptyInstrumentationSpec", func(t *testing.T) {
		gate := feature.NewGate()
		assert.NilError(t, gate.SetFromMap(map[string]bool{
			feature.OpenTelemetryMetrics: true,
		}))
		ctx := feature.NewContext(context.Background(), gate)

		config := NewConfig(nil)
		cluster := new(v1beta1.PostgresCluster)
		require.UnmarshalInto(t, &cluster.Spec, `{
			instrumentation: {}
		}`)
		EnablePgBouncerMetrics(ctx, cluster, config, "test_user")

		result, err := config.ToYAML()
		assert.NilError(t, err)
		assert.DeepEqual(t, result, `# Generated by postgres-operator. DO NOT EDIT.
# Your changes will not be saved.
exporters:
  debug:
    verbosity: detailed
  prometheus/cpk-monitoring:
    endpoint: 0.0.0.0:9187
extensions: {}
processors:
  batch/1s:
    timeout: 1s
  batch/200ms:
    timeout: 200ms
  batch/logs:
    send_batch_size: 8192
    timeout: 200ms
  groupbyattrs/compact: {}
  resourcedetection:
    detectors: []
    override: false
    timeout: 30s
receivers:
  sqlquery:
    datasource: host=localhost dbname=pgbouncer port=5432 user=test_user password=${env:PGPASSWORD}
    driver: postgres
    queries:
    - metrics:
      - attribute_columns:
        - database
        - user
        - state
        - application_name
        - link
        description: Current waiting time in seconds
        metric_name: ccp_pgbouncer_clients_wait_seconds
        value_column: wait
      sql: SHOW CLIENTS;
    - metrics:
      - attribute_columns:
        - name
        - port
        - database
        description: Maximum number of server connections
        metric_name: ccp_pgbouncer_databases_pool_size
        value_column: pool_size
      - attribute_columns:
        - name
        - port
        - database
        description: Minimum number of server connections
        metric_name: ccp_pgbouncer_databases_min_pool_size
        value_column: min_pool_size
      - attribute_columns:
        - name
        - port
        - database
        description: Maximum number of additional connections for this database
        metric_name: ccp_pgbouncer_databases_reserve_pool_size
        value_column: reserve_pool_size
      - attribute_columns:
        - name
        - port
        - database
        description: Maximum number of allowed connections for this database, as set
          by max_db_connections, either globally or per database
        metric_name: ccp_pgbouncer_databases_max_connections
        value_column: max_connections
      - attribute_columns:
        - name
        - port
        - database
        description: Current number of connections for this database
        metric_name: ccp_pgbouncer_databases_current_connections
        value_column: current_connections
      - attribute_columns:
        - name
        - port
        - database
        description: 1 if this database is currently paused, else 0
        metric_name: ccp_pgbouncer_databases_paused
        value_column: paused
      - attribute_columns:
        - name
        - port
        - database
        description: 1 if this database is currently disabled, else 0
        metric_name: ccp_pgbouncer_databases_disabled
        value_column: disabled
      sql: SHOW DATABASES;
    - metrics:
      - attribute_columns:
        - list
        description: Count of items registered with pgBouncer
        metric_name: ccp_pgbouncer_lists_item_count
        value_column: items
      sql: SHOW LISTS;
    - metrics:
      - attribute_columns:
        - database
        - user
        description: Client connections that are either linked to server connections
          or are idle with no queries waiting to be processed
        metric_name: ccp_pgbouncer_pools_client_active
        value_column: cl_active
      - attribute_columns:
        - database
        - user
        description: Client connections that have sent queries but have not yet got
          a server connection
        metric_name: ccp_pgbouncer_pools_client_waiting
        value_column: cl_waiting
      - attribute_columns:
        - database
        - user
        description: Server connections that are linked to a client
        metric_name: ccp_pgbouncer_pools_server_active
        value_column: sv_active
      - attribute_columns:
        - database
        - user
        description: Server connections that are unused and immediately usable for
          client queries
        metric_name: ccp_pgbouncer_pools_server_idle
        value_column: sv_idle
      - attribute_columns:
        - database
        - user
        description: Server connections that have been idle for more than server_check_delay,
          so they need server_check_query to run on them before they can be used again
        metric_name: ccp_pgbouncer_pools_server_used
        value_column: sv_used
      sql: SHOW POOLS;
    - metrics:
      - attribute_columns:
        - database
        - user
        - state
        - application_name
        - link
        description: 1 if the connection will be closed as soon as possible, because
          a configuration file reload or DNS update changed the connection information
          or RECONNECT was issued
        metric_name: ccp_pgbouncer_servers_close_needed
        value_column: close_needed
      sql: SHOW SERVERS;
service:
  extensions: []
  pipelines:
    metrics/pgbouncer:
      exporters:
      - prometheus/cpk-monitoring
      processors:
      - batch/200ms
      - groupbyattrs/compact
      receivers:
      - sqlquery
`)
	})

	t.Run("InstrumentationSpecDefined", func(t *testing.T) {
		gate := feature.NewGate()
		assert.NilError(t, gate.SetFromMap(map[string]bool{
			feature.OpenTelemetryMetrics: true,
		}))
		ctx := feature.NewContext(context.Background(), gate)

		config := NewConfig(testInstrumentationSpec())

		cluster := new(v1beta1.PostgresCluster)
		cluster.Spec.Instrumentation = testInstrumentationSpec()

		EnablePgBouncerMetrics(ctx, cluster, config, "test_user")

		result, err := config.ToYAML()
		assert.NilError(t, err)
		assert.DeepEqual(t, result, `# Generated by postgres-operator. DO NOT EDIT.
# Your changes will not be saved.
exporters:
  debug:
    verbosity: detailed
  googlecloud:
    log:
      default_log_name: opentelemetry.io/collector-exported-log
    project: google-project-name
  prometheus/cpk-monitoring:
    endpoint: 0.0.0.0:9187
extensions: {}
processors:
  batch/1s:
    timeout: 1s
  batch/200ms:
    timeout: 200ms
  batch/logs:
    send_batch_size: 8192
    timeout: 200ms
  groupbyattrs/compact: {}
  resourcedetection:
    detectors: []
    override: false
    timeout: 30s
receivers:
  sqlquery:
    datasource: host=localhost dbname=pgbouncer port=5432 user=test_user password=${env:PGPASSWORD}
    driver: postgres
    queries:
    - metrics:
      - attribute_columns:
        - database
        - user
        - state
        - application_name
        - link
        description: Current waiting time in seconds
        metric_name: ccp_pgbouncer_clients_wait_seconds
        value_column: wait
      sql: SHOW CLIENTS;
    - metrics:
      - attribute_columns:
        - name
        - port
        - database
        description: Maximum number of server connections
        metric_name: ccp_pgbouncer_databases_pool_size
        value_column: pool_size
      - attribute_columns:
        - name
        - port
        - database
        description: Minimum number of server connections
        metric_name: ccp_pgbouncer_databases_min_pool_size
        value_column: min_pool_size
      - attribute_columns:
        - name
        - port
        - database
        description: Maximum number of additional connections for this database
        metric_name: ccp_pgbouncer_databases_reserve_pool_size
        value_column: reserve_pool_size
      - attribute_columns:
        - name
        - port
        - database
        description: Maximum number of allowed connections for this database, as set
          by max_db_connections, either globally or per database
        metric_name: ccp_pgbouncer_databases_max_connections
        value_column: max_connections
      - attribute_columns:
        - name
        - port
        - database
        description: Current number of connections for this database
        metric_name: ccp_pgbouncer_databases_current_connections
        value_column: current_connections
      - attribute_columns:
        - name
        - port
        - database
        description: 1 if this database is currently paused, else 0
        metric_name: ccp_pgbouncer_databases_paused
        value_column: paused
      - attribute_columns:
        - name
        - port
        - database
        description: 1 if this database is currently disabled, else 0
        metric_name: ccp_pgbouncer_databases_disabled
        value_column: disabled
      sql: SHOW DATABASES;
    - metrics:
      - attribute_columns:
        - list
        description: Count of items registered with pgBouncer
        metric_name: ccp_pgbouncer_lists_item_count
        value_column: items
      sql: SHOW LISTS;
    - metrics:
      - attribute_columns:
        - database
        - user
        description: Client connections that are either linked to server connections
          or are idle with no queries waiting to be processed
        metric_name: ccp_pgbouncer_pools_client_active
        value_column: cl_active
      - attribute_columns:
        - database
        - user
        description: Client connections that have sent queries but have not yet got
          a server connection
        metric_name: ccp_pgbouncer_pools_client_waiting
        value_column: cl_waiting
      - attribute_columns:
        - database
        - user
        description: Server connections that are linked to a client
        metric_name: ccp_pgbouncer_pools_server_active
        value_column: sv_active
      - attribute_columns:
        - database
        - user
        description: Server connections that are unused and immediately usable for
          client queries
        metric_name: ccp_pgbouncer_pools_server_idle
        value_column: sv_idle
      - attribute_columns:
        - database
        - user
        description: Server connections that have been idle for more than server_check_delay,
          so they need server_check_query to run on them before they can be used again
        metric_name: ccp_pgbouncer_pools_server_used
        value_column: sv_used
      sql: SHOW POOLS;
    - metrics:
      - attribute_columns:
        - database
        - user
        - state
        - application_name
        - link
        description: 1 if the connection will be closed as soon as possible, because
          a configuration file reload or DNS update changed the connection information
          or RECONNECT was issued
        metric_name: ccp_pgbouncer_servers_close_needed
        value_column: close_needed
      sql: SHOW SERVERS;
service:
  extensions: []
  pipelines:
    metrics/pgbouncer:
      exporters:
      - prometheus/cpk-monitoring
      - googlecloud
      processors:
      - batch/200ms
      - groupbyattrs/compact
      receivers:
      - sqlquery
`)

	})
}
